package com.leilei.join.common;

import cn.hutool.core.util.RandomUtil;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

/**
 * @author lei
 * @version 1.0
 * @date 2021/3/27 16:52
 * @desc 车辆数据源
 */
public class VehicleSource implements SourceFunction<Vehicle> {
    private Boolean flag = true;

    @Override
    public void run(SourceContext<Vehicle> ctx) throws Exception {
        while (flag) {
            int vehicleId = RandomUtil.randomInt(1, 4);
            Vehicle vehicle = Vehicle.builder()
                    .id(vehicleId)
                    .type(vehicleId)
                    .build();
            ctx.collect(vehicle);
            Thread.sleep(2000);
        }
    }

    @Override
    public void cancel() {
        flag = false;
    }
}